﻿<h1><code>sync</code>标准库包中提供的并发同步技术</h1>

<p>
<a href="channel-use-cases.html">通道用例大全</a>一文中介绍了很多通过使用通道来实现并发同步的用例。
事实上，通道并不是Go支持的唯一的一种并发同步技术。而且对于一些特定的情形，通道并不是最有效和可读性最高的同步技术。
本文下面将介绍<code>sync</code>标准库包中提供的各种并发同步技术。相对于通道，这些技术对于某些情形更加适用。
</p>

<p>
<code>sync</code>标准库包提供了一些用于实现并发同步的类型。这些类型适用于各种不同的内存顺序需求。
对于这些特定的需求，这些类型使用起来比通道效率更高，代码实现更简洁。
</p>

<p><i>
（请注意：为了避免各种异常行为，最好不要复制<code>sync</code>标准库包中提供的类型的值。）
</i></p>

<a class="anchor" id="waitgroup"></a>
<h3><code>sync.WaitGroup</code>（等待组）类型</h3>

<div>
<p>
每个<code>sync.WaitGroup</code>值在内部维护着一个计数，此计数的初始默认值为零。
</p>

<p>
<code>*sync.WaitGroup</code>类型有<a href="https://golang.google.cn/pkg/sync/#WaitGroup">三个方法</a>：<code>Add(delta int)</code>、<code>Done()</code>和<code>Wait()</code>。
</p>

对于一个可寻址的<code>sync.WaitGroup</code>值<code>wg</code>，
<ul>
<li>
	我们可以使用方法调用<code>wg.Add(delta)</code>来改变值<code>wg</code>维护的计数。
</li>
<li>
	方法调用<code>wg.Done()</code>和<code>wg.Add(-1)</code>是完全等价的。
</li>
<li>
	如果一个<code>wg.Add(delta)</code>或者<code>wg.Done()</code>调用将<code>wg</code>维护的计数更改成一个负数，一个恐慌将产生。
</li>
<li>
	当一个协程调用了<code>wg.Wait()</code>时，
	<ul>
	<li>
		如果此时<code>wg</code>维护的计数为零，则此<code>wg.Wait()</code>此操作为一个空操作（no-op）；
	</li>
	<li>
		否则（计数为一个正整数），此协程将进入阻塞状态。
		当以后其它某个协程将此计数更改至0时（一般通过调用<code>wg.Done()</code>），此协程将重新进入运行状态（即<code>wg.Wait()</code>将返回）。
	</li>
	</ul>
</li>
</ul>

<p>
请注意<code>wg.Add(delta)</code>、<code>wg.Done()</code>和<code>wg.Wait()</code>分别是<code>(&amp;wg).Add(delta)</code>、<code>(&amp;wg).Done()</code>和<code>(&amp;wg).Wait()</code>的简写形式。
</p>

一般，一个<code>sync.WaitGroup</code>值用来让某个协程等待其它若干协程都先完成它们各自的任务。
一个例子：

<pre class="line-numbers"><code class="language-go">package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func main() {
	rand.Seed(time.Now().UnixNano())

	const N = 5
	var values [N]int32

	var wg sync.WaitGroup
	wg.Add(N)
	for i := 0; i < N; i++ {
		i := i
		go func() {
			values[i] = 50 + rand.Int31n(50)
			fmt.Println("Done:", i)
			wg.Done() // <=> wg.Add(-1)
		}()
	}

	wg.Wait()
	// 所有的元素都保证被初始化了。
	fmt.Println("values:", values)
}
</code></pre>

在此例中，主协程等待着直到其它5个协程已经将各自负责的元素初始化完毕此会打印出各个元素值。
这里是一个可能的程序执行输出结果：

<pre class="output"><code>Done: 4
Done: 1
Done: 3
Done: 0
Done: 2
values: [71 89 50 62 60]
</code></pre>

我们可以将上例中的<code>Add</code>方法调用拆分成多次调用：
<pre class="line-numbers"><code class="language-go">...
	var wg sync.WaitGroup
	for i := 0; i < N; i++ {
		wg.Add(1) // 将被执行5次
		i := i
		go func() {
			values[i] = 50 + rand.Int31n(50)
			wg.Done()
		}()
	}
...
</code></pre>

<p>
</p>

一个<code>*sync.WaitGroup</code>值的<code>Wait</code>方法可以在多个协程中调用。
当对应的<code>sync.WaitGroup</code>值维护的计数降为0，这些协程都将得到一个（广播）通知而结束阻塞状态。

<pre class="line-numbers"><code class="language-go">func main() {
	rand.Seed(time.Now().UnixNano())

	const N = 5
	var values [N]int32

	var wgA, wgB sync.WaitGroup
	wgA.Add(N)
	wgB.Add(1)

	for i := 0; i < N; i++ {
		i := i
		go func() {
			wgB.Wait() // 等待广播通知
			log.Printf("values[%v]=%v \n", i, values[i])
			wgA.Done()
		}()
	}

	// 下面这个循环保证将在上面的任何一个
	// wg.Wait调用结束之前执行。
	for i := 0; i < N; i++ {
		values[i] = 50 + rand.Int31n(50)
	}
	wgB.Done() // 发出一个广播通知
	wgA.Wait()
}
</code></pre>

<p>
</p>

<p>
一个<code>WaitGroup</code>可以在它的一个<code>Wait</code>方法返回之后被重用。
但是请注意，当一个<code>WaitGroup</code>值维护的基数为零时，它的带有正整数实参的<code>Add</code>方法调用不能和它的<code>Wait</code>方法调用并发运行，否则将可能出现数据竞争。
</p>

</div>

<a class="anchor" id="once"></a>
<h3><code>sync.Once</code>类型</h3>

<div>
<p>
每个<code>*sync.Once</code>值有一个<code>Do(f func())</code>方法。
此方法只有一个类型为<code>func()</code>的参数。
</p>

<p>
对一个可寻址的<code>sync.Once</code>值<code>o</code>，<code>o.Do()</code>（即<code>(&amp;o).Do()</code>的简写形式）方法调用可以在多个协程中被多次并发地执行，
这些方法调用的实参应该（但并不强制）为同一个函数值。
在这些方法调用中，有且只有一个调用的实参函数（值）将得到调用。
此被调用的实参函数保证在任何<code>o.Do()</code>方法调用返回之前退出。
换句话说，被调用的实参函数内的代码将在任何<code>o.Do()</code>方法返回调用之前被执行。
</p>

<p>
一般来说，一个<code>sync.Once</code>值被用来确保一段代码在一个并发程序中被执行且仅被执行一次。
</p>

一个例子：
<pre class="line-numbers"><code class="language-go">package main

import (
	"log"
	"sync"
)

func main() {
	log.SetFlags(0)

	x := 0
	doSomething := func() {
		x++
		log.Println("Hello")
	}

	var wg sync.WaitGroup
	var once sync.Once
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			once.Do(doSomething)
			log.Println("world!")
		}()
	}

	wg.Wait()
	log.Println("x =", x) // x = 1
}
</code></pre>

<p>
在此例中，<code>Hello</code>将仅被输出一次，而<code>world!</code>将被输出5次，并且<code>Hello</code>肯定在所有的5个<code>world!</code>之前输出。
</p>

</div>

<a class="anchor" id="mutex"></a>
<h3><code>sync.Mutex</code>（互斥锁）和<code>sync.RWMutex</code>（读写锁）类型</h3>

<div>
<p>
<code>*sync.Mutex</code>和<code>*sync.RWMutex</code>类型都实现了<a href="https://golang.google.cn/pkg/sync/#Locker"><code>sync.Locker</code>接口类型</a>。
所以这两个类型都有两个方法：<code>Lock()</code>和<code>Unlock()</code>，用来保护一份数据不会被多个使用者同时读取和修改。
</p>

<p>
除了<code>Lock()</code>和<code>Unlock()</code>这两个方法，<code>*sync.RWMutex</code>类型还有两个另外的方法：<code>RLock()</code>和<code>RUnlock()</code>，用来支持多个读取者并发读取一份数据但防止此份数据被某个数据写入者和其它数据访问者（包括读取者和写入者）同时使用。
</p>

<p>
（注意：这里的<b><i>数据读取者</i></b>和<b><i>数据写入者</i></b>不应该从字面上理解。有时候某些数据读取者可能修改数据，而有些数据写入者可能只读取数据。）
</p>

<p>
一个<code>Mutex</code>值常称为一个互斥锁。
一个<code>Mutex</code>零值为一个尚未加锁的互斥锁。
一个（可寻址的）<code>Mutex</code>值<code>m</code>只有在未加锁状态时才能通过<code>m.Lock()</code>方法调用被成功加锁。
换句话说，一旦<code>m</code>值被加了锁（亦即某个<code>m.Lock()</code>方法调用成功返回），
一个新的加锁试图将导致当前协程进入阻塞状态，直到此<code>Mutex</code>值被解锁为止（通过<code>m.Unlock()</code>方法调用）。
</p>

<p>
注意：<code>m.Lock()</code>和<code>m.Unlock()</code>分别是<code>(&amp;m).Lock()</code>和<code>(&amp;m).Unlock()</code>的简写形式。
</p>

一个使用<code>sync.Mutex</code>的例子：
<pre class="line-numbers"><code class="language-go">package main

import (
	"fmt"
	"runtime"
	"sync"
)

type Counter struct {
	m sync.Mutex
	n uint64
}

func (c *Counter) Value() uint64 {
	c.m.Lock()
	defer c.m.Unlock()
	return c.n
}

func (c *Counter) Increase(delta uint64) {
	c.m.Lock()
	c.n += delta
	c.m.Unlock()
}

func main() {
	var c Counter
	for i := 0; i < 100; i++ {
		go func() {
			for k := 0; k < 100; k++ {
				c.Increase(1)
			}
		}()
	}

	// 此循环仅为演示目的。
	for c.Value() < 10000 {
		runtime.Gosched()
	}
	fmt.Println(c.Value()) // 10000
}
</code></pre>

<p>
在上面这个例子中，一个<code>Counter</code>值使用了一个<code>Mutex</code>字段来确保它的字段<code>n</code>永远不会被多个协程同时使用。
</p>

<p>
一个<code>RWMutex</code>值常称为一个读写互斥锁。
对于一个可寻址的<code>RWMutex</code>值<code>rwm</code>，数据写入者可以通过方法调用<code>rwm.Lock()</code>获取<code>rwm</code>的写锁，或者通过<code>m.RLock()</code>方法调用获取<code>rwm</code>的读锁。
方法调用<code>rwm.Unlock()</code>和<code>rwm.RUnlock()</code>用来释放<code>rwm</code>的写锁和读锁。
</p>

<p>
注意<code>rwm.Lock()</code>、<code>rwm.Unlock()</code>、<code>rwm.RLock()</code>和<code>rwm.RUnlock()</code>分别是<code>(&amp;rwm).Lock()</code>、<code>(&amp;rwm).Unlock()</code>、<code>(&amp;rwm).RLock()</code>和<code>(&amp;rwm).RUnlock()</code>的简写形式。
</p>

对于一个可寻址的<code>RWMutex</code>值<code>rwm</code>，下列规则存在：
<ul>
<li>
	一个数据写入者只能在<code>rwm</code>的写锁和读锁都尚未被获取持有的情况下才能被成功获取。
	换句话说，<code>rwm</code>的写锁在任何时刻最多只能被一个数据写入值成功获取持有，并且<code>rwm</code>的写锁和读锁不能同时被持有。
</li>
<li>
	当<code>rwm</code>的写锁被一个数据写入者所持有的时候，任何新的试图获取它的写锁或者读锁的操作都将导致当前协程进入阻塞状态，直到此写锁被释放，新的获取写锁或者读锁的试图才有机会成功。
</li>
<li>
	当<code>rwm</code>的读锁被某个数据读取者所获取持有，新的获取它的写锁的试图将导致当前协程进入阻塞状态。
	但是，一个新的获取它的读锁的试图将成功，除非此试图操作发生在某个被阻塞的获取写锁的试图之后（见下一条规则）。
	换句话说，一个读写互斥锁的读锁可以同时被多个数据读取者同时持有。
</li>
<li>
	假设<code>rwm</code>的读锁正在被某些数据读取者所持有，为了防止后续数据写入者没有机会成功获取写锁，后续发生在某个被阻塞的获取写锁的试图之后的所有获取读锁的试图将被阻塞。
</li>
<li>
	假设<code>rwm</code>的写锁正在被某个数据写入者所持有，（至少对于标准编译器来说，）为了防止后续数据读取者没有机会成功获取读锁，发生在此写锁下一次被释放之前的所有获取读锁的试图将在此写锁下一次被释放之后肯定取得成功，即使这些所有获取读锁的试图发生在一些仍被阻塞的获取写锁的试图之后。
</li>
</ul>

<p>
后两条规则是为了确保数据读取者和写入者都有机会执行它们的操作。
</p>

<p>
请注意：一个锁并不会绑定到一个协程上；换句话说，一个锁的获取者和此锁的持有者（以及释放者）可能不是一个协程，尽管在实践中这种情况比较少见。
</p>

<!--
https://github.com/golang/go/issues/17973
-->

在上一个例子中，如果<code>Value</code>方法被十分频繁调用而<code>Increase</code>方法并不频繁被调用，则<code>Counter</code>类型的<code>m</code>字段的类型可以更改为<code>sync.RWMutex</code>，从而使得执行效率更高，如下面的代码所示。

<pre class="line-numbers"><code class="language-go">...
type Counter struct {
	//m sync.Mutex
	m sync.RWMutex
	n uint64
}

func (c *Counter) Value() uint64 {
	//c.m.Lock()
	//defer c.m.Unlock()
	c.m.RLock()
	defer c.m.RUnlock()
	return c.n
}
...
</code></pre>

<p>
<code>sync.RWMutex</code>值的另一个应用场景是将一个写任务分隔成若干小的写任务。下一节中展示了一个这样的例子。
</p>

根据上面列出的后两条规则，下面这个程序最有可能输出<code>abdc</code>。

<pre class="line-numbers"><code class="language-go">package main

import (
	"fmt"
	"time"
	"sync"
)

func main() {
	var m sync.RWMutex
	go func() {
		m.RLock()
		fmt.Print("a")
		time.Sleep(time.Second)
		m.RUnlock()
	}()
	go func() {
		time.Sleep(time.Second * 1 / 4)
		m.Lock()
		fmt.Print("b")
		time.Sleep(time.Second)
		m.Unlock()
	}()
	go func() {
		time.Sleep(time.Second * 2 / 4)
		m.Lock()
		fmt.Print("c")
		m.Unlock()
	}()
	go func () {
		time.Sleep(time.Second * 3 / 4)
		m.RLock()
		fmt.Print("d")
		m.RUnlock()
	}()
	time.Sleep(time.Second * 3)
	fmt.Println()
}
</code></pre>

<p>
请注意，上例这个程序仅仅是为了解释和验证上面列出的读写锁的后两条加锁规则。
此程序使用了<code>time.Sleep</code>调用来做协程间的同步。<a href="concurrent-common-mistakes.html#sleep">这种所谓的同步方法不应该被使用在生产代码中</a>。
</p>

<code>sync.Mutex</code>和<code>sync.RWMutex</code>值也可以用来实现通知，尽管这不是Go中最优雅的方法来实现通知。
下面是一个使用了<code>Mutex</code>值来实现通知的例子。

<pre class="line-numbers"><code class="language-go">package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var m sync.Mutex
	m.Lock()
	go func() {
		time.Sleep(time.Second)
		fmt.Println("Hi")
		m.Unlock() // 发出一个通知
	}()
	m.Lock() // 等待通知
	fmt.Println("Bye")
}
</code></pre>

<p>
在此例中，<code>Hi</code>将确保在<code>Bye</code>之前打印出来。
关于<code>sync.Mutex</code>和<code>sync.RWMutex</code>值相关的内存顺序保证，请阅读<a href="memory-model.html#mutex">Go中的内存顺序保证</a>一文。
</p>

</div>

<a class="anchor" id="cond"></a>
<h3><code>sync.Cond</code>类型</h3>

<div>
<p>
<code>sync.Cond</code>类型提供了一种有效的方式来实现多个协程间的通知。
</p>

<p>
每个<code>sync.Cond</code>值拥有一个<code>sync.Locker</code>类型的名为<code>L</code>的字段。
此字段的具体值常常为一个<code>*sync.Mutex</code>值或者<code>*sync.RWMutex</code>值。
</p>

<p>
<code>*sync.Cond</code>类型有<a href="https://golang.google.cn/pkg/sync/#Cond">三个方法</a>：<code>Wait()</code>、<code>Signal()</code>和<code>Broadcast()</code>。
</p>

每个<code>Cond</code>值维护着一个先进先出等待协程队列。
对于一个可寻址的<code>Cond</code>值<code>c</code>，
<ul>
<li>
	<code>c.Wait()</code>必须在<code>c.L</code>字段值的锁被成功获取的时候调用；否则，<code>c.Wait()</code>调用将造成一个恐慌。
	一个<code>c.Wait()</code>调用将
	<ol>
	<li>
		首先将当前协程推入到<code>c</code>所维护的等待协程队列；
	</li>
	<li>
		然后调用<code>c.L.Unlock()</code>释放<code>c.L</code>的锁；
	</li>
	<li>
		<p>
		然后使当前协程进入阻塞状态；<!--sup>(1)</sup-->
		</p>

		<p><i>
		（当前协程将被另一个协程通过<code>c.Signal()</code>或<code>c.Broadcast()</code>调用唤醒而重新进入运行状态。）
		</i></p>

		<p>
		</p>
		一旦当前协程重新进入运行状态，<code>c.L.Lock()</code>将被调用以试图重新获取<code>c.L</code>字段值的锁。
		此<code>c.Wait()</code>调用将在此试图成功之后退出。
	</li>
	</ol>
</li>
<li>
	一个<code>c.Signal()</code>调用将唤醒并移除<code>c</code>所维护的等待协程队列中的第一个协程（如果此队列不为空的话）。
</li>
<li>
	一个<code>c.Broadcast()</code>调用将唤醒并移除<code>c</code>所维护的等待协程队列中的所有协程（如果此队列不为空的话）。
</li>
</ul>

<p>
请注意：<code>c.Wait()</code>、<code>c.Signal()</code>和<code>c.Broadcast()</code>分别为<code>(&amp;c).Wait()</code>、<code>(&amp;c).Signal()</code>和<code>(&amp;c).Broadcast()</code>的简写形式。
</p>

<p>
<code>c.Signal()</code>和<code>c.Broadcast()</code>调用常用来通知某个条件的状态发生了变化。
一般说来，<code>c.Wait()</code>应该在一个检查某个条件是否已经得到满足的循环中调用。
</p>

<!--
<sup>(1)</sup> This is for the most common case,
by assuming that <code>c.Signal()</code> and <code>c.Broadcast()</code> are not called
in the short period between the step 2 and step 3.

<ul>
<li>
If <code>c.Signal()</code> is called in the short period between the step 2 and step 3,
then the caller goroutine of <code>c.Wait()</code> might not enter blocking state.
</li>
<li>
If <code>c.Broadcast()</code> is called in the short period between the step 2 and step 3,
then the caller goroutine of <code>c.Wait()</code> will not enter blocking state for sure.
</li>
</ul>
-->

下面是一个典型的<code>sync.Cond</code>用例。

<pre class="line-numbers"><code class="language-go">package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func main() {
	rand.Seed(time.Now().UnixNano())

	const N = 10
	var values [N]string

	cond := sync.NewCond(&sync.Mutex{})

	for i := 0; i < N; i++ {
		d := time.Second * time.Duration(rand.Intn(10)) / 10
		go func(i int) {
			time.Sleep(d) // 模拟一个工作负载
			cond.L.Lock()
			// 下面的修改必须在cond.L被锁定的时候执行
			values[i] = string('a' + i)
			cond.Broadcast() // 可以在cond.L被解锁后发出通知
			cond.L.Unlock()
			// 上面的通知也可以在cond.L未锁定的时候发出。
			//cond.Broadcast() // 上面的调用也可以放在这里
		}(i)
	}

	// 此函数必须在cond.L被锁定的时候调用。
	checkCondition := func() bool {
		fmt.Println(values)
		for i := 0; i < N; i++ {
			if values[i] == "" {
				return false
			}
		}
		return true
	}

	cond.L.Lock()
	defer cond.L.Unlock()
	for !checkCondition() {
		cond.Wait() // 必须在cond.L被锁定的时候调用
	}
}
</code></pre>

一个可能的输出：
<pre class="output"><code>[         ]
[     f    ]
[  c   f    ]
[  c   f  h  ]
[ b c   f  h  ]
[a b c   f  h  j]
[a b c   f g h i j]
[a b c  e f g h i j]
[a b c d e f g h i j]
</code></pre>

<p>
因为上例中只有一个协程（主协程）在等待通知，所以其中的<code>cond.Broadcast()</code>调用也可以换为<code>cond.Signal()</code>。
如上例中的注释所示，<code>cond.Broadcast()</code>和<code>cond.Signal()</code>不必在<code>cond.L</code>的锁被成功获取的时候调用。
</p>

<p>
为了防止数据竞争，对自定义条件的修改必须在<code>cond.L</code>的锁被成功获取的时候才能执行。
另外，<code>checkCondition</code>函数和<code>cond.Wait</code>方法也必须在<code>cond.L</code>的锁被成功获取的时候才可被调用。
</p>

<p>
事实上，对于上面这个特定的例子，<code>cond.L</code>字段的也可以为一个<code>*sync.RWMutex</code>值。
对自定义条件的十个部分的修改可以在<code>RWMutex</code>值的读锁被成功获取的时候执行。这十个修改可以并发进行，因为它们是互不干扰的。
如下面的代码所示：
</p>

<pre class="line-numbers"><code class="language-go">...
	cond := sync.NewCond(&sync.RWMutex{})
	cond.L.Lock()

	for i := 0; i < N; i++ {
		d := time.Second * time.Duration(rand.Intn(10)) / 10
		go func(i int) {
			time.Sleep(d)
			cond.L.(*sync.RWMutex).RLock()
			values[i] = string('a' + i)
			cond.L.(*sync.RWMutex).RUnlock()
			cond.Signal()
		}(i)
	}
...
</code></pre>

<p>
在上面的代码中，此<code>sync.RWMutex</code>值的用法有些不符常规。
它的读锁被一些修改数组元素的协程所获取并持有，而它的写锁被主协程获取持有用来读取并检查各个数组元素的值。
</p>

<code>Cond</code>值所表示的自定义条件可以是一个虚无。对于这种情况，此<code>Cond</code>值纯粹被用来实现通知。
比如，下面这个程序将打印出<code>abc</code>或者<code>bac</code>。

<pre class="line-numbers"><code class="language-go">package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}
	wg.Add(1)
	cond := sync.NewCond(&sync.Mutex{})
	cond.L.Lock()
	go func() {
		cond.L.Lock()
		go func() {
			cond.L.Lock()
			cond.Broadcast()
			cond.L.Unlock()
		}()
		cond.Wait()
		fmt.Print("a")
		cond.L.Unlock()
		wg.Done()
	}()
	cond.Wait()
	fmt.Print("b")
	cond.L.Unlock()
	wg.Wait()
	fmt.Println("c")
}
</code></pre>

<p>
</p>

<p>
如果需要，多个<code>sync.Cond</code>值可以共享一个<code>sync.Locker</code>值。但是这种情形在实践中并不多见。
</p>
</div>
